# ver 20220201
from mypy import now,tryx,time_maker,_print,save,load,touch_dir,file_exists,argv,argc

_time_init = now() # time when file init

_ = {'time_init':_time_init} # priv data

import os,numpy as np,pandas as pd

class main:
#############################################################################
    def __init__(self, token_tmp='token.tmp', dbstr_tmp='db.tmp', root_data_path='../tmp/raw'):
        #time_init = now()
        _hash = hash(self)
        _[_hash] = {}

        try:
            with open(token_tmp, 'r') as file:
                _token = file.read().replace('\n', '').strip()
            import tushare as ts
            if not _token:
                print('WARNING pro_api needs token')
            else:
                ts.set_token(_token)
                _[_hash]['token']=_token
            self.ts = ts
            self.pro = ts.pro_api()
        except Exception as ex:
            _print(ex)

        self.get_yyyymm01 = lambda n:time_maker(0,months=-n,outfmt='%Y%m01')
        self.root_data_path = root_data_path

        from multiprocessing import Lock
        self.c_lock = Lock()
        self.ca = [] # work with c_lock

    #def __delete__(self, instance): _print("TODO __delete__", instance)

    # del XXXX
    def __del__(self): _.pop(hash(self), None)

#############################################################################
    # TODO get_df_yyyy(self, ts_code, yyyy): using parallel ;)
    # e.g. for i in range(1,12*14): biz.get_df_yymm('399905.SZ', time_maker(months=-i,outfmt='%y%m') )
    def get_df_yymm(self, ts_code, yymm, y_base=200000, freq='1min', save=True, compression='gzip'):
        root_data_path = self.root_data_path
        start_date = '{}01'.format(y_base+int(yymm))
        #end_date = time_maker(-1,months=1,date=start_date,infmt='%Y%m%d')
        end_date = time_maker(months=1,date=start_date,infmt='%Y%m%d')
        _print('get_mn_df_yymm: start_date',start_date,'end_date',end_date,ts_code)
        df = self.ts.pro_bar(ts_code=ts_code, freq=freq, start_date=start_date, end_date=end_date,)
        if save:
            if len(df)>0:
                yyyymm = y_base + int(yymm)
                touch_dir(f'{root_data_path}/{yyyymm}/')
                df.to_parquet(f'{root_data_path}/{yyyymm}/{ts_code}.parquet',compression=compression)
        return df

    def get_list_agu(self):
        return list(self.pro.query('stock_basic',list_status='L', fields='ts_code')['ts_code'].values)

    def get_list_kzz(self):
        return list(self.pro.cb_issue(fields='ts_code')['ts_code'].values)

    def get_list_fund(self):
        return list(self.pro.fund_basic(market='E')['ts_code'].values)

    def do_build_df(self, mth_dlt=0, resume=True):
        from multiprocessing.dummy import Pool #dummy for thread pool
        
        l = self.get_prod_list()

        yymm = self.get_yyyymm01(1+mth_dlt)[2:6]

        def get_df(code):
            df = self.get_df_yymm(code,yymm)
            df[code]=df['close']#.astype(np.float32)
            df['min']=df['trade_time'].map(lambda v:time_maker(
                date=v,infmt='%Y-%m-%d %H:%M:%S',
                outfmt='0'
                ))
            rt=df[[code,'min']].set_index('min')
            return rt

        # rt=df.astype(pd.SparseDtype(np.float64,np.nan))
        # >>> rt.memory_usage().sum()/df.memory_usage().sum()
        # 1.1757154669760983
        from time import sleep
        ca = self.ca
        def get_by_code(v):
            with self.c_lock:
                while len(ca)>400:#...
                    while tryx(lambda:ca.pop(0)<now()-60.):
                        print('pop',len(ca))
                        pass
                    print('sleep for', len(ca))
                    sleep(1)
            fnx = './tmp/{}/{}.parquet'.format(yymm,v) # extended
            if mth_dlt<0 or not resume:
                ca.append(now())
                _print(len(ca),now(),v,mth_dlt,resume)
                df = get_df(v)
                if len(df)>0:
                    os.makedirs('./tmp/{}/'.format(yymm),exist_ok=True)
                    df.to_parquet(fnx,compression='gzip')
                sleep(0.2)
            elif not file_exists(fnx):#remote@buf
                ca.append(now())
                _print(len(ca),now(),v,'saved')
                df = get_df(v)
                if len(df)>0:
                    os.makedirs('./tmp/{}/'.format(yymm),exist_ok=True)
                    df.to_parquet(fnx,compression='gzip')
                sleep(0.2)
            else:
                _print(len(ca),now(),v,'loaded',yymm)
                df = tryx(lambda:pd.read_parquet(fnx))
            return df
        # todo: if mmdd dd<7 pool 1 else pool 4...

        dfa = Pool(8).map(get_by_code,l)

        #rt = pd.concat(dfa,join='outer',axis=1).apply(pd.to_numeric, errors='coerce').fillna(0.)
        rt = pd.concat(dfa,join='outer',axis=1)

        #rt = rt.astype(pd.SparseDtype(np.float32,np.nan))
        ##if 'datatime'==index:
        ##    rt.index=pd.to_datetime(rt.index.map(lambda v:time_maker(date=v,infmt='0',outfmt='%Y-%m-%d %H:%M:%S')))
        #tryx(lambda:rt.to_parquet('var/{}.parquet'.format(yymm),compression='gzip'),lambda v:print(type(v),str(v)[:99]))

        tryx(lambda:rt.to_parquet('var/{}.parquet'.format(yymm)),lambda v:print(type(v),str(v)[:99]))
        return rt

    def get_adj(self, ts_code='', trade_date=''):
        return self.pro.adj_factor(ts_code=ts_code, trade_date=trade_date, )

    # tmp solution...(merge into a parquet, which should be divide by year in future)
    def sync_adj(self, trade_date, df):
        df_adj = self.get_adj(trade_date=trade_date)
        df_merge = pd.concat([df,df_adj]).drop_duplicates()
        print('sync_adj',trade_date,df_merge)
        return df_merge

    def sync_adj_loop(self,day_from=-99,day_to=1):
        fn='hist/adj.parquet'
        df = tryx(lambda:pd.read_parquet(fn))
        for i in range(day_from,day_to):
            df = self.sync_adj(time_maker(i,outfmt='%Y%m%d'),df)
        touch_dir('hist/')
        df.to_parquet(fn)
        return df

    # stock + stock-convertable-bond + etf + some-idx...
    def get_prod_list_cache(self):
        return tryx(lambda:load('tmp/prod.json')) or self.get_prod_list()

    def get_prod_list(self, l_default=[
        '399300.SZ',#hushen 300
        '399305.SZ',#zhongzheng 500
        '399006.SZ',#chuangye index
        '399001.SZ',#shenzheng index
        '000016.SH',#shangzheng 50
        '000852.SH',#zhongzheng 1000
        ]):
        pro = self.pro
        L0 = list(l_default) +self.get_list_agu() +self.get_list_kzz() + self.get_list_fund()
        L = list(set(L0)) # remove duplicated
        L = [x for x in L if not x.endswith('BJ')]
        touch_dir('tmp/')
        save('tmp/prod.json',L)
        return L

    def load_df_pq(self,tm_day_ago=28, yymmdd_to=None, use_float32 = True, cache=True,):

        tm_to_1 = None

        if yymmdd_to is not None:
            tm_to_1 = time_maker(1,date=yymmdd_to,infmt='%y%m%d',outfmt='') # include yymmdd_to

        if tm_to_1 is None:
            tm_to_1 = time_maker(1,outfmt='')

        tm_to_0 = time_maker(-1,date=tm_to_1,infmt='',outfmt='')
        tm_from = time_maker(-tm_day_ago,date=tm_to_1,infmt='',outfmt='')

        yymmdd_from = time_maker(date=tm_from,infmt='',outfmt='%y%m%d')
        yymmdd_to_1 = time_maker(date=tm_to_1,infmt='',outfmt='%y%m%d')
        yymmdd_to_0 = time_maker(date=tm_to_0,infmt='',outfmt='%y%m%d')

        yymm_from = time_maker(date=tm_from,infmt='',outfmt='%y%m')
        yymm_to = time_maker(date=tm_to_1,infmt='',outfmt='%y%m')
        yymm_to_0 = time_maker(date=tm_to_0,infmt='',outfmt='%y%m')

        fn = 'tmp/{}_{}.parquet'.format(yymmdd_to_0,tm_day_ago)
        if cache:
            print('tryx',fn)
            df = tryx(lambda:pd.read_parquet(fn))
            if df is not None: return df

        ym_a = []
        tm_i = tm_from
        while True:
            ym_i = time_maker(date=tm_i,infmt='',outfmt='%y%m')
            if int(ym_i)>int(yymm_to_0):
                break
            else:
                ym_a.append(ym_i)
                tm_i = time_maker(date=tm_i,months=1,infmt='',outfmt='')
        print(ym_a)

        from pyarrow import parquet as pq, concat_tables

        # maybe change in future if any better solution....
        m = concat_tables([pq.read_table('var/{}.parquet'.format(yymm),memory_map=True)
            for yymm in reversed(ym_a)],promote=True)

        df = m.to_pandas().fillna(0.).sort_index()

        if use_float32: df = df.astype('float32')

        print('range:',yymmdd_from,'00:00,',yymmdd_to_1,'00:00')

        df = df[(df.index < tm_to_1)&(df.index>tm_from)]

        df.to_parquet(fn)

        return pd.read_parquet(fn)
        
if __name__ == '__main__':

    biz = main()

    mths=int(argv[1]) if argc>1 else -1

    biz.sync_adj_loop(0) # simple sync adj (0) for today

    df = biz.do_build_df(mths)
    print(df)
else:
    biz = main()

